import 'dart:async';

import 'package:async/async.dart';
import 'package:async_test/src/utils.dart';

import '../common/test_page.dart';

class StreamQueueTestPage extends TestPage {
  StreamQueueTestPage(super.title) {
    group('StreamQueue<int>(StreamController<int>().stream)', () {
      test('StreamQueue<int>() 在第一个请求时被侦听，在两个请求之间暂停', () async {
        var controller = StreamController<int>();
        var events = StreamQueue<int>(controller.stream);
        await flushMicrotasks();
        expect(controller.hasListener);

        var next = events.next;
        expect(controller.hasListener);
        expect(controller.isPaused);

        controller.add(1);

        expect(await next);
        expect(controller.hasListener);
        expect(controller.isPaused);

        next = events.next;
        expect(controller.hasListener);
        expect(controller.isPaused);

        controller.add(2);

        expect(await next);
        expect(controller.hasListener);
        expect(controller.isPaused);

        events.cancel();
        expect(controller.hasListener);
      });
    });

    group('StreamQueue<int>().eventsDispatched事件已处理', () {
      test('StreamQueue<int>().eventsDispatched 下一个未来完成后的增量', () async {
        var events = StreamQueue<int>(createStream());

        expect(events.eventsDispatched);
        await flushMicrotasks();
        expect(events.eventsDispatched);

        var next = events.next;
        expect(events.eventsDispatched);

        await next;
        expect(events.eventsDispatched);

        await events.next;
        expect(events.eventsDispatched);
      });

      test('StreamQueue<int>().take多次递增多值请求', () async {
        var events = StreamQueue<int>(createStream());
        await events.take(3);
        expect(events.eventsDispatched);
      });

      test('StreamQueue<int>().withTransaction()为已接受的事务增加多次', () async {
        var events = StreamQueue<int>(createStream());
        await events.withTransaction((queue) async {
          await queue.next;
          await queue.next;
          return true;
        });
        expect(events.eventsDispatched);
      });

      test("StreamQueue<int>()..toList()rest不增加休息请求", () async {
        var events = StreamQueue<int>(createStream());
        await events.rest.toList();
        expect(events.eventsDispatched);
      });
    });

    group('StreamQueue<int>().lookAhead()前瞻性操作', () {
      test('StreamQueue<int>().lookAhead() 作为简单的事件列表', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.lookAhead(4));
        expect(await events.next);
        expect(await events.lookAhead(2));
        expect(await events.take(2));
        expect(await events.next);
        await events.cancel();
      });

      test('共0个事件', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.lookAhead(0));
        expect(await events.next);
        expect(await events.lookAhead(0));
        expect(await events.next);
        expect(await events.lookAhead(0));
        expect(await events.next);
        expect(await events.lookAhead(0));
        expect(await events.next);
        expect(await events.lookAhead(0));
        expect(await events.lookAhead(5));
        expect(events.next);
        await events.cancel();
      });

      test('带有错误论点的throws', () async {
        var events = StreamQueue<int>(createStream());
        expect(() => events.lookAhead(-1));
        expect(await events.next);
        expect(() => events.lookAhead(-1));
        expect(await events.next);
        await events.cancel();
      });

      test('太多的争论', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.lookAhead(6));
        await events.cancel();
      });

      test('过大', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.lookAhead(6));
        await events.cancel();
      });

      test('错误', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(events.lookAhead(4));
        expect(events.take(4));
        expect(await events.next);
        await events.cancel();
      });
    });

    group('StreamQueue<int>().next下一步操作', () {
      test('StreamQueue<int>().next 简单的请求序列', () async {
        var events = StreamQueue<int>(createStream());
        for (var i = 1; i <= 4; i++) {
          expect(await events.next);
        }
        expect(events.next);
      });

      test('同时有多个请求', () async {
        var events = StreamQueue<int>(createStream());
        var result = await Future.wait(
            [events.next, events.next, events.next, events.next]);
        expect(result);
        await events.cancel();
      });

      test('有错误的请求序列', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(await events.next);
        expect(await events.next);
        expect(events.next);
        expect(await events.next);
        await events.cancel();
      });
    });

    group('StreamQueue<int>().skip()跳过操作', () {
      test('StreamQueue<int>().skip() 序列中间的两个元素', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.skip(2));
        expect(await events.next);
        await events.cancel();
      });

      test('带有否定/错误参数的throws', () async {
        var events = StreamQueue<int>(createStream());
        expect(() => events.skip(-1));
        expect(await events.next);
        expect(() => events.skip(-1));
        expect(await events.next);
        await events.cancel();
      });

      test('共0个元素作品', () async {
        var events = StreamQueue<int>(createStream());
        expect(events.skip(0));
        expect(events.next);
        expect(events.skip(0));
        expect(events.next);
        expect(events.skip(0));
        expect(events.next);
        expect(events.skip(0));
        expect(events.next);
        expect(events.skip(0));
        expect(events.skip(5));
        expect(events.next);
        await events.cancel();
      });

      test('在流开始时结束的事件太多', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.skip(6));
        await events.cancel();
      });

      test('在一些事件之后发生了太多的事件', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.skip(6));
        await events.cancel();
      });

      test('在流结束时结束的事件太多', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.skip(2));
        await events.cancel();
      });

      test('发生错误的事件数', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(events.skip(4));
        expect(await events.next);
        await events.cancel();
      });

      test('个事件中有错误，之后再次跳过', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(events.skip(4));
        expect(events.skip(2));
        await events.cancel();
      });
      test('同时按顺序完成多个跳过。', () async {
        var events = StreamQueue<int>(createStream());
        var skip1 = events.skip(1);
        var skip2 = events.skip(0);
        var skip3 = events.skip(4);
        var skip4 = events.skip(1);
        var index = 0;
        Func1Required<int?> sequence(expectedValue, sequenceIndex) => (value) {
              expect(value);
              expect(index);
              index++;
              return null;
            };
        await Future.wait([
          skip1.then(sequence(0, 0)),
          skip2.then(sequence(0, 1)),
          skip3.then(sequence(1, 2)),
          skip4.then(sequence(1, 3))
        ]);
        await events.cancel();
      });
    });

    group('StreamQueue<int>().take()接受操作', () {
      test('StreamQueue<int>().take() 对事件的简单理解', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.take(2));
        expect(await events.next);
        await events.cancel();
      });

      test('共0个事件', () async {
        var events = StreamQueue<int>(createStream());
        expect(events.take(0));
        expect(events.next);
        expect(events.take(0));
        expect(events.next);
        expect(events.take(0));
        expect(events.next);
        expect(events.take(0));
        expect(events.next);
        expect(events.take(0));
        expect(events.take(5));
        expect(events.next);
        await events.cancel();
      });

      test('带有错误论点的throws', () async {
        var events = StreamQueue<int>(createStream());
        expect(() => events.take(-1));
        expect(await events.next);
        expect(() => events.take(-1));
        expect(await events.next);
        await events.cancel();
      });

      test('太多的争论', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.take(6));
        await events.cancel();
      });

      test('过大', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.take(6));
        await events.cancel();
      });

      test('错误', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(events.take(4));
        expect(await events.next);
        await events.cancel();
      });
    });

    group('StreamQueue<int>().rest休息操作', () {
      test('StreamQueue<int>().rest 在单个next之后', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.rest.toList());
      });

      test('开始时', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.rest.toList());
      });

      test('在最后', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.rest.toList());
      });

      test('结束后', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(events.next);
        expect(await events.rest.toList());
      });

      test('收到请求后', () async {
        var events = StreamQueue<int>(createStream());
        var next1 = events.next;
        var next2 = events.next;
        var next3 = events.next;
        var rest = events.rest;
        for (var i = 0; i < 10; i++) {
          await flushMicrotasks();
        }
        expect(await next1);
        expect(await next2);
        expect(await next3);
        expect(await rest.toList());
      });

      test('带有错误事件错误', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(await events.next);
        var rest = events.rest;
        var events2 = StreamQueue(rest);
        expect(await events2.next);
        expect(events2.next);
        expect(await events2.next);
      });

      test('关闭事件，阻止其他操作', () async {
        var events = StreamQueue<int>(createStream());
        var stream = events.rest;
        expect(() => events.next);
        expect(() => events.skip(1));
        expect(() => events.take(1));
        expect(() => events.rest);
        expect(() => events.cancel());
        expect(stream.toList());
      });

      test('转发到基础流', () async {
        var cancel = Completer();
        var controller = StreamController<int>(onCancel: () => cancel.future);
        var events = StreamQueue<int>(controller.stream);
        expect(controller.hasListener);
        var next = events.next;
        expect(controller.hasListener);
        expect(controller.isPaused);

        controller.add(1);
        expect(await next);
        expect(controller.isPaused);

        var rest = events.rest;
        var subscription = rest.listen(null);
        expect(controller.hasListener);
        expect(controller.isPaused);

        dynamic lastEvent;
        subscription.onData((value) => lastEvent = value);

        controller.add(2);

        await flushMicrotasks();
        expect(lastEvent);
        expect(controller.hasListener);
        expect(controller.isPaused);

        subscription.pause();
        expect(controller.isPaused);

        controller.add(3);

        await flushMicrotasks();
        expect(lastEvent);
        subscription.resume();

        await flushMicrotasks();
        expect(lastEvent);

        var cancelFuture = subscription.cancel();
        expect(controller.hasListener);
        cancel.complete(42);
        expect(cancelFuture);
      });
    });

    group('StreamQueue<int>().peek窥视操作', () {
      test('StreamQueue<int>().peek 偷看一个事件', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.peek);
        expect(await events.next);
        expect(await events.peek);
        expect(await events.take(2));
        expect(await events.peek);
        expect(await events.next);
        expect(events.peek);
        await events.cancel();
      });
      test('同时有多个请求', () async {
        var events = StreamQueue<int>(createStream());
        var result = await Future.wait(
            [events.peek, events.peek, events.next, events.peek, events.peek]);
        expect(result);
        await events.cancel();
      });
      test('有错误的请求序列', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(await events.next);
        expect(await events.next);
        expect(events.peek);
        expect(events.peek);
        expect(events.next);
        expect(await events.next);
        await events.cancel();
      });
    });

    group('StreamQueue<int>().cancel取消操作', () {
      test('StreamQueue<int>().cancel 关闭事件，阻止任何其他操作', () async {
        var events = StreamQueue<int>(createStream());
        await events.cancel();
        expect(() => events.lookAhead(1));
        expect(() => events.next);
        expect(() => events.peek);
        expect(() => events.skip(1));
        expect(() => events.take(1));
        expect(() => events.rest);
        expect(() => events.cancel());
      });

      test('在任何事件之前调用时取消基础订阅', () async {
        var cancelFuture = Future.value(42);
        var controller = StreamController<int>(onCancel: () => cancelFuture);
        var events = StreamQueue<int>(controller.stream);
        expect(await events.cancel());
      });

      test('取消基础订阅，返回结果', () async {
        var cancelFuture = Future.value(42);
        var controller = StreamController<int>(onCancel: () => cancelFuture);
        var events = StreamQueue<int>(controller.stream);
        controller.add(1);
        expect(await events.next);
        expect(await events.cancel());
      });

      test('StreamQueue<int>().cancel(immediate: true)关闭事件，阻止任何其他操作', () async {
        var events = StreamQueue<int>(createStream());
        await events.cancel(immediate: true);
        expect(() => events.next);
        expect(() => events.skip(1));
        expect(() => events.take(1));
        expect(() => events.rest);
        expect(() => events.cancel());
      });

      test('StreamQueue<int>().cancel(immediate: true)立即取消基础订阅', () async {
        var controller = StreamController<int>();
        controller.add(1);

        var events = StreamQueue<int>(controller.stream);
        expect(await events.next);
        expect(controller.hasListener);

        await events.cancel(immediate: true);
        expect(controller.hasListener);
      });

      test('StreamQueue<int>().cancel(immediate: true)在任何事件之前调用时取消基础订阅',
          () async {
        var cancelFuture = Future.value(42);
        var controller = StreamController<int>(onCancel: () => cancelFuture);

        var events = StreamQueue<int>(controller.stream);
        expect(await events.cancel(immediate: true));
      });

      test('StreamQueue<int>().cancel(immediate: true)关闭挂起的请求', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(events.next);
        expect(events.hasNext);

        await events.cancel(immediate: true);
      });

      test('StreamQueue<int>().cancel(immediate: true)返回关闭基础订阅的结果', () async {
        var controller =
            StreamController<int>(onCancel: () => Future<int>.value(42));
        var events = StreamQueue<int>(controller.stream);
        expect(await events.cancel(immediate: true));
      });

      test("StreamQueue<int>().cancel(immediate: true)侦听，然后取消尚未侦听的流", () async {
        var wasListened = false;
        var controller =
            StreamController<int>(onListen: () => wasListened = true);
        var events = StreamQueue<int>(controller.stream);
        expect(wasListened);
        expect(controller.hasListener);

        await events.cancel(immediate: true);
        expect(wasListened);
        expect(controller.hasListener);
      });
    });

    group('StreamQueue<int>().hasNext检测是否有下一位操作', () {
      test('StreamQueue<int>().hasNext 开始时为true', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.hasNext);
      });

      test('启动后为true', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
      });

      test('最后为true', () async {
        var events = StreamQueue<int>(createStream());
        for (var i = 1; i <= 4; i++) {
          expect(await events.next);
        }
        expect(await events.hasNext);
      });

      test('排队时为true', () async {
        var events = StreamQueue<int>(createStream());
        var values = <int>[];
        for (var i = 1; i <= 3; i++) {
          events.next.then(values.add);
        }
        expect(values);
        expect(await events.hasNext);
        expect(values);
      });

      test('排队时为false', () async {
        var events = StreamQueue<int>(createStream());
        var values = <int>[];
        for (var i = 1; i <= 4; i++) {
          events.next.then(values.add);
        }
        expect(values);
        expect(await events.hasNext);
        expect(values);
      });

      test('数据事件时为true', () async {
        var controller = StreamController<int>();
        var events = StreamQueue<int>(controller.stream);

        bool? hasNext;
        events.hasNext.then((result) {
          hasNext = result;
        });
        await flushMicrotasks();
        expect(hasNext);
        controller.add(42);
        expect(hasNext);
        await flushMicrotasks();
        expect(hasNext);
      });

      test('错误事件时为true', () async {
        var controller = StreamController<int>();
        var events = StreamQueue<int>(controller.stream);

        bool? hasNext;
        events.hasNext.then((result) {
          hasNext = result;
        });
        await flushMicrotasks();
        expect(hasNext);
        controller.addError('BAD');
        expect(hasNext);
        await flushMicrotasks();
        expect(hasNext);
        expect(events.next);
      });

      test('-下一位之后下一位', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.hasNext);
        expect(await events.hasNext);
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.hasNext);
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.hasNext);
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.hasNext);
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.hasNext);
      });

      test('-true之后的下一个', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.next);
        expect(await events.next);
      });

      test('-在true之后的下一个，已排队', () async {
        var events = StreamQueue<int>(createStream());
        var responses = <Object>[];
        events.next.then(responses.add);
        events.hasNext.then(responses.add);
        events.next.then(responses.add);
        do {
          await flushMicrotasks();
        } while (responses.length < 3);
        expect(responses);
      });

      test('-在true之后跳过0', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.skip(0));
        expect(await events.next);
      });

      test('-在true之后跳过1', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.skip(1));
        expect(await events.next);
      });

      test('-在true之后跳过2', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.skip(2));
        expect(await events.next);
      });

      test('-在true之后接受0', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.take(0));
        expect(await events.next);
      });

      test('-在true之后接受1', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.take(1));
        expect(await events.next);
      });

      test('-在true之后接受2', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.take(2));
        expect(await events.next);
      });

      test('-在true之后休息', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.hasNext);
        var stream = events.rest;
        expect(await stream.toList());
      });

      test('-在true之后休息,终于', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.hasNext);
        var stream = events.rest;
        expect(await stream.toList());
      });

      test('-在false之后休息', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.hasNext);
        var stream = events.rest;
        expect(await stream.toList());
      });

      test('-数据为true后取消', () async {
        var events = StreamQueue<int>(createStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.cancel());
      });

      test('-出现错误时在true之后取消', () async {
        var events = StreamQueue<int>(createErrorStream());
        expect(await events.next);
        expect(await events.next);
        expect(await events.hasNext);
        expect(await events.cancel());
      });
    });

    group('StreamQueue().startTransaction()操作生成的事务', () {
      late StreamQueue<int> events;
      late StreamQueueTransaction<int> transaction;
      late StreamQueue<int> queue1;
      late StreamQueue<int> queue2;
      setUp() async {
        events = StreamQueue(createStream());
        expect(await events.next);
        transaction = events.startTransaction();
        queue1 = transaction.newQueue();
        queue2 = transaction.newQueue();
      }

      test('StreamQueue().startTransaction() 发出队列,独立发射事件', () async {
        await setUp();
        expect(await queue1.next);
        expect(await queue2.next);
        expect(await queue2.next);
        expect(await queue1.next);
        expect(await queue1.next);
        expect(await queue2.next);
        expect(await queue1.hasNext);
        expect(await queue2.hasNext);
      });

      test('发出队列,对事件的请求进行排队', () async {
        await setUp();
        expect(await queue1.next);
        expect(await queue2.next);
        expect(await queue2.next);
        expect(await queue1.next);
        expect(await queue1.next);
        expect(await queue2.next);
        expect(await queue1.hasNext);
        expect(await queue2.hasNext);
      });

      test('发出队列,独立发出错误', () async {
        await setUp();
        events = StreamQueue(createErrorStream());
        expect(await events.next);
        transaction = events.startTransaction();
        queue1 = transaction.newQueue();
        queue2 = transaction.newQueue();

        expect(queue1.next);
        expect(queue2.next);
        expect(queue2.next);
        expect(queue1.next);
        expect(queue1.next);
        expect(queue2.next);
        expect(queue1.hasNext);
        expect(queue2.hasNext);
      });

      test('被拒绝时,其他原始请求使用以前的状态', () async {
        await setUp();
        expect(await queue1.next);
        expect(await queue2.next);
        expect(await queue2.next);

        await flushMicrotasks();
        transaction.reject();

        expect(await events.next);
        expect(await events.next);
        expect(await events.next);
        expect(await events.hasNext);
      });

      test('被拒绝时,挂起的原始请求使用以前的状态', () async {
        await setUp();
        expect(await queue1.next);
        expect(await queue2.next);
        expect(await queue2.next);
        expect(events.next);
        expect(events.next);
        expect(events.next);
        expect(events.hasNext);

        await flushMicrotasks();
        transaction.reject();
      });

      test('被拒绝时,进一步的子请求就像流已关闭一样', () async {
        await setUp();
        expect(await queue1.next);
        transaction.reject();

        expect(await queue1.hasNext);
        expect(queue1.next);
      });

      test('被拒绝时,挂起的子请求就像流已关闭一样', () async {
        await setUp();
        expect(await queue1.next);
        expect(queue1.hasNext);
        expect(queue1.next);
        transaction.reject();
      });

      test('被拒绝时,挂起的子rest请求不再发出事件', () async {
        await setUp();
        var controller = StreamController();
        var events = StreamQueue(controller.stream);
        var transaction = events.startTransaction();
        var queue = transaction.newQueue();

        queue.rest.listen((_) {}, onDone: (() {}));

        controller.add(1);
        controller.add(2);
        controller.add(3);
        await flushMicrotasks();

        transaction.reject();
        await flushMicrotasks();

        controller.add(4);
        controller.add(5);
        expect(await events.next);
      });

      test("被拒绝时,子请求的cancel（）仍然可以显式调用", () async {
        await setUp();
        transaction.reject();
        await queue1.cancel();
        expect(() => queue1.cancel());
      });

      test('被拒绝时,对commit（）或reject（）的调用失败', () async {
        await setUp();
        transaction.reject();
        expect(transaction.reject);
        expect(() => transaction.commit(queue1));
      });

      test('被拒绝时,在事务发出任何事件之前,不执行任何操作', () async {
        await setUp();
        var controller = StreamController();
        var events = StreamQueue(controller.stream);

        expect(events.next);
        events.startTransaction().reject();
        expect(events.next);

        await flushMicrotasks();
        controller.add(1);
        await flushMicrotasks();
        controller.add(2);
        await flushMicrotasks();
        controller.close();
      });

      test('被拒绝时,可以拒绝一个事务，其中一个副本在事务中被完全消耗，并且生成了第二个副本', () async {
        await setUp();
        final queue = StreamQueue(Stream.fromIterable([0]));
        final transaction = queue.startTransaction();

        final copy1 = transaction.newQueue();
        final inner1 = copy1.startTransaction();
        final innerCopy1 = inner1.newQueue();
        await innerCopy1.next;

        transaction.newQueue();

        transaction.reject();
        expect(await queue.next);
        expect(await queue.hasNext);
      });

      test('提交时,其他原始请求使用提交状态', () async {
        await setUp();
        expect(await queue1.next);
        await flushMicrotasks();
        transaction.commit(queue1);
        expect(await events.next);
      });

      test('提交时,挂起的原始请求使用已提交状态', () async {
        await setUp();
        expect(await queue1.next);
        expect(events.next);
        await flushMicrotasks();
        transaction.commit(queue1);
      });

      test('提交时,进一步的子请求就像流已关闭一样', () async {
        await setUp();
        expect(await queue2.next);
        transaction.commit(queue2);

        expect(await queue1.hasNext);
        expect(queue1.next);
      });

      test('提交时,挂起的子请求就像流已关闭一样', () async {
        await setUp();
        expect(await queue2.next);
        expect(queue1.hasNext);
        expect(queue1.next);
        transaction.commit(queue2);
      });

      test('提交时,进一步的请求就像流被关闭一样', () async {
        await setUp();
        expect(await queue1.next);
        transaction.commit(queue1);

        expect(await queue1.hasNext);
        expect(queue1.next);
      });

      test('提交时,cancel()仍然可以显式调用', () async {
        await setUp();
        expect(await queue1.next);
        transaction.commit(queue1);
        await queue1.cancel();
      });

      test('提交时,如果存在挂起的请求，则抛出', () async {
        await setUp();
        expect(await queue1.next);
        expect(queue1.hasNext);
        expect(() => transaction.commit(queue1));
      });

      test('提交时,对commit()或reject()的调用失败', () async {
        await setUp();
        transaction.commit(queue1);
        expect(transaction.reject);
        expect(() => transaction.commit(queue1));
      });

      test('提交时,在事务发出任何事件之前，不执行任何操作', () async {
        await setUp();
        var controller = StreamController();
        var events = StreamQueue(controller.stream);

        expect(events.next);
        var transaction = events.startTransaction();
        transaction.commit(transaction.newQueue());
        expect(events.next);

        await flushMicrotasks();
        controller.add(1);
        await flushMicrotasks();
        controller.add(2);
        await flushMicrotasks();
        controller.close();
      });
    });

    group('StreamQueue().withTransaction()带事务处理操作', () {
      late StreamQueue<int> events;
      setUp() async {
        events = StreamQueue(createStream());
        expect(await events.next);
      }

      test('StreamQueue().withTransaction() 传递父队列的副本', () async {
        setUp();
        await events.withTransaction((queue) async {
          expect(await queue.next);
          expect(await queue.next);
          expect(await queue.next);
          expect(await queue.hasNext);
          return true;
        });
      });

      test('如果返回true，则父队列从子位置继续', () async {
        setUp();
        await events.withTransaction((queue) async {
          expect(await queue.next);
          return true;
        });

        expect(await events.next);
      });

      test('如果返回false，父队列将从其原始位置继续', () async {
        setUp();
        await events.withTransaction((queue) async {
          expect(await queue.next);
          return false;
        });

        expect(await events.next);
      });

      test('如果抛出，父队列将从子位置继续', () {
        setUp();
        expect(events.withTransaction((queue) async {
          expect(await queue.next);
          throw 'oh no';
        }));

        expect(events.next);
      });

      test('返回事务是否成功', () {
        expect(events.withTransaction((_) async => true));
        expect(events.withTransaction((_) async => false));
      });
    });

    group('StreamQueue().cancelable()可取消操作', () {
      late StreamQueue<int> events;
      setUp() async {
        events = StreamQueue(createStream());
        expect(await events.next);
      }

      test('StreamQueue().cancelable() 传递父队列的副本', () async {
        setUp();
        await events.cancelable((queue) async {
          expect(await queue.next);
          expect(await queue.next);
          expect(await queue.next);
          expect(await queue.hasNext);
        }).value;
      });

      test('默认情况下，父队列从子位置继续', () async {
        setUp();
        await events.cancelable((queue) async {
          expect(await queue.next);
        }).value;

        expect(await events.next);
      });

      test('如果抛出错误，父队列将从子位置继续', () async {
        setUp();
        expect(
            events.cancelable((queue) async {
              expect(await queue.next);
              throw 'oh no';
            }).value);

        expect(events.next);
      });

      test('如果取消，父队列将从原始位置继续', () async {
        setUp();
        var operation = events.cancelable((queue) async {
          expect(await queue.next);
        });
        operation.cancel();

        expect(await events.next);
      });

      test('转发回调中的值', () async {
        setUp();
        expect(
            await events.cancelable((queue) async {
              expect(await queue.next);
              return 'value';
            }).value);
      });
    });

    test('所有组合顺序跳过/下一步/执行操作skip/next/take operations', () async {
      var eventCount = 20 * (3 * 3 + 1);
      var events = StreamQueue<int>(createLongStream(eventCount));

      void nextTest(int startIndex) {
        for (var i = 0; i < 10; i++) {
          expect(events.next);
        }
      }

      void skipTest(startIndex) {
        expect(events.skip(10));
      }

      void takeTest(int startIndex) {
        expect(events.take(10));
      }

      var tests = [nextTest, skipTest, takeTest];

      var counter = 0;
      for (var i = 0; i < tests.length; i++) {
        for (var j = 0; j < tests.length; j++) {
          tests[i](counter);
          tests[j](counter + 10);
          counter += 20;
        }
      }
      expect(events.rest.toList());
    });
  }
}

typedef Func1Required<T> = T Function(T value);

Stream<int> createStream() async* {
  yield 1;
  await flushMicrotasks();
  yield 2;
  await flushMicrotasks();
  yield 3;
  await flushMicrotasks();
  yield 4;
}

Stream<int> createErrorStream() {
  var controller = StreamController<int>();
  () async {
    controller.add(1);
    await flushMicrotasks();
    controller.add(2);
    await flushMicrotasks();
    controller.addError('To err is divine!');
    await flushMicrotasks();
    controller.add(4);
    await flushMicrotasks();
    controller.close();
  }();
  return controller.stream;
}

Stream<int> createLongStream(int eventCount) async* {
  for (var i = 0; i < eventCount; i++) {
    yield i;
  }
}
